package org.nbict.iot.trend;

import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Values;

/**
 * Created by songseven on 18/6/29.
 */
public class MovingAverageFunction extends BaseFunction {

    private EWMA ewma;
    private EWMA.Time emitRaterPer;

    public MovingAverageFunction(EWMA ewma, EWMA.Time emitRaterPer) {
        this.ewma = ewma;
        this.emitRaterPer = emitRaterPer;
    }

    @Override
    public void execute(TridentTuple tuple, TridentCollector
            collector) {
        this.ewma.mark(tuple.getLong(0));
        collector.emit(new Values(this.ewma.getAverageRatePer(this
                .emitRaterPer)));
    }
}
